package org.codehaus.activemq.transport.tcp;

import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
import EDU.oswego.cs.dl.util.concurrent.BoundedChannel;
import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue;
import EDU.oswego.cs.dl.util.concurrent.Executor;
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.InetAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.UnknownHostException;
import javax.jms.JMSException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.message.Packet;
import org.codehaus.activemq.message.WireFormat;
import org.codehaus.activemq.transport.AbstractTransportChannel;
import org.codehaus.activemq.util.JMSExceptionHelper;

public class TcpTransportChannel extends AbstractTransportChannel implements Runnable {
	private static final int SOCKET_BUFFER_SIZE = 65536;
	private static final int SO_TIMEOUT = 5000;
	private static final Log log = LogFactory.getLog(TcpTransportChannel.class);
	protected Socket socket;
	private WireFormat wireFormat;
	private DataOutputStream dataOut;
	private DataInputStream dataIn;
	private SynchronizedBoolean closed;
	private SynchronizedBoolean started;
	private Object outboundLock;
	private Executor executor;
	private Thread thread;
	private boolean useAsyncSend = false;
	private boolean serverSide = false;
	private BoundedChannel exceptionsList;

	protected TcpTransportChannel(WireFormat wireFormat) {
		this.wireFormat = wireFormat;
		this.closed = new SynchronizedBoolean(false);
		this.started = new SynchronizedBoolean(false);

		this.exceptionsList = new BoundedLinkedQueue(10);
		this.outboundLock = new Object();
		if (this.useAsyncSend)
			this.executor = new PooledExecutor(new BoundedBuffer(1000), 1);
	}

	public TcpTransportChannel(WireFormat wireFormat, URI remoteLocation) throws JMSException {
		this(wireFormat);
		try {
			this.socket = createSocket(remoteLocation);
			this.socket.setReceiveBufferSize(65536);
			this.socket.setSendBufferSize(65536);
			BufferedInputStream buffIn = new BufferedInputStream(this.socket.getInputStream());
			this.dataIn = new DataInputStream(buffIn);
			TcpBufferedOutputStream buffOut = new TcpBufferedOutputStream(this.socket.getOutputStream());
			this.dataOut = new DataOutputStream(buffOut);
		} catch (Exception ioe) {
			throw JMSExceptionHelper.newJMSException("Initialization of TcpTransportChannel failed: " + ioe, ioe);
		}
	}

	public TcpTransportChannel(WireFormat wireFormat, URI remoteLocation, URI localLocation) throws JMSException {
		this(wireFormat);
		try {
			this.socket = createSocket(remoteLocation, localLocation);
			this.socket.setReceiveBufferSize(65536);
			this.socket.setSendBufferSize(65536);
			BufferedInputStream buffIn = new BufferedInputStream(this.socket.getInputStream());
			this.dataIn = new DataInputStream(buffIn);
			TcpBufferedOutputStream buffOut = new TcpBufferedOutputStream(this.socket.getOutputStream());
			this.dataOut = new DataOutputStream(buffOut);
		} catch (Exception ioe) {
			throw JMSExceptionHelper.newJMSException("Initialization of TcpTransportChannel failed: " + ioe, ioe);
		}
	}

	public TcpTransportChannel(WireFormat wireFormat, Socket socket, Executor executor) throws JMSException {
		this(wireFormat);
		this.socket = socket;
		this.executor = executor;
		this.serverSide = true;
		try {
			socket.setReceiveBufferSize(65536);
			socket.setSendBufferSize(65536);
			TcpBufferedOutputStream buffOut = new TcpBufferedOutputStream(socket.getOutputStream());
			this.dataOut = new DataOutputStream(buffOut);
			BufferedInputStream buffIn = new BufferedInputStream(socket.getInputStream());
			this.dataIn = new DataInputStream(buffIn);
		} catch (IOException ioe) {
			throw JMSExceptionHelper.newJMSException("Initialization of TcpTransportChannel failed: " + ioe, ioe);
		}
	}

	public void stop() {
		if (this.closed.commit(false, true)) {
			super.stop();
			try {
				stopExecutor(this.executor);
				this.dataOut.close();
				this.dataIn.close();
				this.socket.close();
			} catch (Exception e) {
				log.warn("Caught while closing: " + e + ". Now Closed", e);
			}
		}
	}

	public void start() throws JMSException {
		if (this.started.commit(false, true)) {
			this.thread = new Thread(this, "Thread:" + toString());
			this.thread.setDaemon(true);
			if (!this.serverSide) {
				this.thread.setPriority(7);
			}
			this.thread.start();
		}
	}

	public void asyncSend(Packet packet) throws JMSException {
		if (this.executor != null) {
			try {
				this.executor.execute(new Runnable() {
					private final Packet val$packet=packet;

					public void run() {
						try {
							if (!TcpTransportChannel.this.closed.get())
								TcpTransportChannel.this.doAsyncSend(this.val$packet);
						} catch (JMSException e) {
							try {
								TcpTransportChannel.this.exceptionsList.put(e);
							} catch (InterruptedException e1) {
								TcpTransportChannel.log.warn("Failed to add element to exception list: " + e1);
							}
						}
					}
				});
			} catch (InterruptedException e) {
				log.info("Caught: " + e, e);
			}
			try {
				JMSException e = (JMSException) this.exceptionsList.poll(0L);
				if (e != null)
					throw e;
			} catch (InterruptedException e1) {
				log.warn("Failed to remove element to exception list: " + e1);
			}
		} else {
			doAsyncSend(packet);
		}
	}

	public boolean isMulticast() {
		return false;
	}

	public void run() {
		log.trace("TCP consumer thread starting");
		int count = 0;
		while (!this.closed.get()) {
			if (this.serverSide) {
				count++;
				if (count > 500) {
					count = 0;
					Thread.yield();
				}
			}
			int type = 0;
			try {
				this.socket.setSoTimeout(5000);
				while ((type = this.dataIn.read()) == 0)
					;
				if (type == -1) {
					log.trace("The socket peer is now closed");

					stop();
				} else {
					this.socket.setSoTimeout(0);

					Packet packet = this.wireFormat.readPacket(type, this.dataIn);
					if (packet != null) {
						doConsumePacket(packet);
					}

				}

			} catch (SocketTimeoutException ste) {
			} catch (InterruptedIOException ioe) {
			} catch (IOException e) {
				doClose(e);
			}
		}
	}

	public String toString() {
		return "TcpTransportChannel: " + this.socket;
	}

	protected void doAsyncSend(Packet packet) throws JMSException {
		try {
			synchronized (this.outboundLock) {
				this.wireFormat.writePacket(packet, this.dataOut);
				this.dataOut.flush();
			}
		} catch (IOException e) {
			throw JMSExceptionHelper.newJMSException("asyncSend failed: " + e, e);
		}
	}

	private void doClose(Exception ex) {
		if (!this.closed.get()) {
			onAsyncException(JMSExceptionHelper.newJMSException("Error reading socket: " + ex, ex));
			stop();
		}
	}

	protected Socket createSocket(URI remoteLocation) throws UnknownHostException, IOException {
		return new Socket(remoteLocation.getHost(), remoteLocation.getPort());
	}

	protected Socket createSocket(URI remoteLocation, URI localLocation) throws IOException, UnknownHostException {
		return new Socket(remoteLocation.getHost(), remoteLocation.getPort(),
				InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
	}
}